[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56045
[SPARK-56651][CONNECT][SDP] Add Python APIs for Auto CDC SCD Type 1#56045anew wants to merge 3 commits into
Conversation
- Remove spaces around = in keyword arguments (PEP 8) - Fix type hint: List[Union[str, Column]] -> Union[List[str], List[Column]] - Reorder imports and collapse unnecessary line continuations Co-authored-by: Isaac
AnishMahto
left a comment
There was a problem hiding this comment.
Only real comment is to drop ignore null API for now. LGTM.
| assert sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py") | ||
|
|
||
| def test_create_auto_cdc_flow(self): | ||
| from pyspark.sql.connect.functions.builtin import col, expr |
There was a problem hiding this comment.
non-blocking nit: can just lift imports out of each individual test
| self.assertEqual(sink_obj.options["key1"], "value1") | ||
| assert sink_obj.source_code_location.filename.endswith("test_graph_element_registry.py") | ||
|
|
||
| def test_create_auto_cdc_flow(self): |
There was a problem hiding this comment.
non-blocking nit: This test can just be collapsed with test_create_auto_cdc_flow_with_all_args
There was a problem hiding this comment.
I guess this tests the behavior with minimal required arguments. But I'd say it should validate that the defaults are correct. Adding that.
| ignore_null_updates_column_list: Optional[Union[List[str], List[Column]]] = None, | ||
| ignore_null_updates_except_column_list: Optional[Union[List[str], List[Column]]] = None, |
There was a problem hiding this comment.
Let's add these API later when ignore null execution support is actually built.
There was a problem hiding this comment.
are we not building that?
There was a problem hiding this comment.
We will eventually (hopefully soon!), but I'm generally in favor of only adding the API once the feature is built. Otherwise we'll just be throwing a not support exception anyway if the user tries specifying an ignore null column selection.
| keys: Union[List[str], List[Column]], | ||
| sequence_by: Union[str, Column], | ||
| apply_as_deletes: Optional[Union[str, Column]] = None, | ||
| apply_as_truncates: Optional[Union[str, Column]] = None, |
There was a problem hiding this comment.
Just a heads up, there's a good chance we're not going to get apply_as_truncates functionality merged in for the 4.2 cut.
I'll most likely drop this argument when I connect these APIs to the graph registration context on the spark connect backend, and then add it back for spark 4.3+.
No action needed on your side, just giving the heads up.
- Move inline imports to module level - Fix assertNone -> assertIsNone - Fix assertEqual(stored_as_scd_type, "1") -> assertIsNone for default case - Add missing assertions for optional fields in test_create_auto_cdc_flow Co-authored-by: Isaac
szehon-ho
left a comment
There was a problem hiding this comment.
A few nits on validation, tests, imports, and docstring casing.
| ) | ||
| ignore_null_updates_except_column_list = _normalize_optional_column_list( | ||
| ignore_null_updates_except_column_list | ||
| ) |
There was a problem hiding this comment.
Doc says mutual exclusion and non-empty keys, but nothing enforces it. Validate after normalization (like other SDP APIs) so users get a clear client error.
| return _normalize_column_list(column_list) | ||
|
|
||
|
|
||
| def _normalize_column_list( |
There was a problem hiding this comment.
Add tests for string args (keys=["id"], sequence_by="ts", etc.), not only Connect col/expr.
| from pyspark.sql.connect.dataframe import DataFrame as ConnectDataFrame | ||
| from pyspark.sql.connect.types import pyspark_types_to_proto_types | ||
| from pyspark.sql.types import StructType | ||
| from pyspark.pipelines.add_pipeline_analysis_context import add_pipeline_analysis_context |
There was a problem hiding this comment.
Nit: import shuffle only — consider keeping prior order to shrink diff.
| table. These keys also identify records in the target table, e.g., if there exists a record \ | ||
| for given keys and the CDC source has an UPSERT operation for the same keys, we will update \ | ||
| the existing record. At least one key must be provided. This should be a list of column \ | ||
| identifiers without qualifiers, expressed as either Python strings or Pyspark Columns. |
There was a problem hiding this comment.
Nit: Pyspark → PySpark in this docstring (573, 575, 581-585).
What changes were proposed in this pull request?
Adds
create_auto_cdc_flowto the the SDP Python API. For now, this will only support SCD Type 1. Parameters:Why are the changes needed?
See the SPIP at https://docs.google.com/document/d/1Hp5BGEYJRHbk6J7XUph3bAPZKRQXKOuV1PEaqZMMRoQ/
Does this PR introduce any user-facing change?
Yes, it introduces a new method in the SDP Python API.
How was this patch tested?
Unit tests were added, using a local graph registry.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6